  
  package com.zs.pig.storm;  
import java.util.Map;
import java.util.Random;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

import com.zs.pig.common.redis.test.RedisUtils;

import et.oschina.common.search.test.SnowflakeIdWorker;
  
/** 
 * Created by IntelliJ IDEA. User: comaple.zhang Date: 12-8-28 Time: 下午2:11 To 
 * change this template use File | Settings | File Templates. 
 */  
@SuppressWarnings("serial")  
public class AllDatatoRedisBolt extends BaseRichBolt {  
  
    private OutputCollector collector;  
    RedisUtils r;
    Random rr;
    @Override  
    public void prepare(Map stormConf, TopologyContext context,  
            OutputCollector collector) {  
        this.collector = collector;  
        r=new RedisUtils();
        rr=new Random();
    }  
  
    @Override  
    public void execute(Tuple input) {  
        try {  
        	SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0);
        	String mesg = input.getString(0);  
                String [] msgs=mesg.split(" - ");
               // if (mesg != null && msgs.length == 9) {  
                	System.out.println("redis....");
                 //    collector.emit(new Values(mesg.split(" - ")[0],mesg.split(" - ")[1]));  
                     r.set(idWorker.nextId()+"",mesg);
                     r.hset("blog", idWorker.nextId()+"", mesg);
               // }
  
        } catch (Exception e) {  
            e.printStackTrace(); // To change body of catch statement use File |  
            collector.fail(input);                  // Settings | File Templates.  
        }  
        collector.ack(input);  
    }  
  
    @Override  
    public void declareOutputFields(OutputFieldsDeclarer declarer) {  
        declarer.declare(new Fields("id","data"));  
    }  
}  